Codetopia 的週末,空氣中不只有鹹鹹的海風,還瀰漫著一股...混亂的氣息。沒錯,萬眾矚目的「海港音樂祭」即將登場!但在此之前,一場風暴已在市府內部悄悄上演。
想像一下這個畫面:
Rhea|道路維修隊領班,正對著對講機大喊:「A 街給我封起來!立刻!」
Zoe|通知平台工程師,同時在 Slack 上敲字:「B 區的改道簡訊準備發送!」
Owen|交通資料工程師,盯著模擬數據,眉頭緊鎖:「等等!模型顯示 A 街封閉會造成回堵,快調整參數!」
Felix|監控工程師,緊急插播:「回報!舞台區的臨時電力接點訊號異常,監控優先權需要提高!」
他們每個人都在為音樂祭努力,但溝通方式卻是最原始的網狀結構——私訊、臨時電話、開了又散的 ad-hoc 會議。命令互相打架,資源互相搶奪。最慘的是,出現了「你以為他撤了,但他以為你先撤了」的經典甩鍋盲點。砰!一聲令下,市府高層的耐心終於告罄:「三十分鐘內,給我拉起一個協調中心!所有人,禁止再直接摳對方!」
🧭 術語卡(今日會用到)
GoF|Mediator (中介者):用一個中介物件來封裝一系列的物件互動。中介者使各物件不需要明確地相互引用,從而使其耦合鬆散,而且可以獨立地改變它們之間的互動。
EIP/EDA|Orchestrator vs Choreography:
Orchestrator
(協同指揮) 像個樂團指揮,由中心節點發號施令,決定流程順序與邏輯。Choreography
(事件編舞) 則像舞池裡的舞者,沒有指揮,大家各自監聽事件並做出反應。今天,我們需要的是一位指揮家!MAS|Coordinator Agent + DF:在多代理系統中,由「協調者代理」向「黃頁服務 (Directory Facilitator)」查詢有哪些代理具備特定能力,然後主動發起協商,達成任務目標。
讓我們把時間倒回高層發飆前的十分鐘。那簡直是專案管理的災難片現場:Slack 上開了 47 個跟音樂祭有關的 side-channels、三條電話會議同時在線、四張互相重疊的甘特圖在螢幕上閃爍。
Rhea 剛把 A 街的封鎖線拉起來,Owen 的動態路網模型還在運算,根本來不及同步;另一邊的 Zoe 已經按下發送鍵,將「請改道 B 區」的簡訊推播給了上萬市民;Felix 此時又回報,因為電力問題,A 街的管制必須再延長兩小時...
每個人都在「微調」別人的決策,結果就是一場集體參與的「震盪迴路」。沒有人知道全局,也沒有人能下最終決定。
核心壞味道 👃:
平息這場混亂的唯一解法,就是派一位王牌交通警察登場——Mediator (協調中心)。
一句話概括:把多對多的網狀互動,收斂成一對多的星狀結構。
所有局處單位(我們稱之為 Colleagues
,同事們)不再彼此對話,他們只跟「協調中心」溝通。所有排程、資源仲裁、衝突解決、甚至事後補償,都由協調中心統一處理和下達。
✅ 互動關係複雜且易變:當一群物件之間的溝通方式錯綜複雜,像蜘蛛網一樣時。特別適用於活動、災防、突發狀況這種臨時性任務編組。
✅ 需要一致的決策順序與衝突仲裁:就像這次的音樂祭,封路、通知、用電有嚴格的先後順序和資源排他性,必須有個裁判。
✅ 想隱藏實作細節:每個單位只需要對外暴露「意圖」(我想封路),而不需要把自己的 API 細節(block_road(street, start_time, end_time)
)暴露給所有其他人。
⛔ 流程是線性的:如果任務只是簡單的 A→B→C 逐級處理,那用我們之前提過的 責任鏈模式 (Chain of Responsibility) 就夠了,殺雞焉用牛刀。
⛔ 只是想替換演算法:如果只是想在不同情境下切換不同的處理策略,用 策略模式 (Strategy) 更輕巧。
⛔ 僅需單向廣播:如果只是單純的一對多事件通知,且發送者不關心接收者的後續反應,用 觀察者模式 (Observer) 實現鬆耦合更佳。
⛔ 自治的事件舞蹈:如果各系統能透過監聽事件來自發性地完成協作,且不需要中心化的仲裁,那麼選擇 Choreography
(事件編舞) 會讓系統更有彈性。
⛔ 處理長期業務流程:對於需要跨越數日、包含人工審批、或需要持久化的複雜工作流(例如:為期三天的完整活動規劃與資源調度),應交給更專業的 Workflow Engine 或 Saga 模式處理。Mediator 專注於即時、短生命週期的協調。
一句話對位總結:
Mediator: 多方協調中心
Observer: 單向事件廣播
Chain of Responsibility: 線性逐級審批
Strategy: 封裝可替換演算法
導播,鏡頭拉一下!讓我們用三個不同焦段來看看這個「協調中心」的設計藍圖。
視角 | 觀念/模式 | 在 Codetopia 的說法 |
---|---|---|
微觀 (GoF) | Mediator 封裝了 Colleague 之間的互動 | 音樂祭調度中心 (FestivalMediator) 統一處理道路隊、通知、監控等單位的請求。 |
中觀 (EIP/EDA) | Orchestrator 依序執行,處理補償 | 協同指揮官 收到「場佈」指令,依序執行:① 封路 → ② 推播通知 → ③ 開啟監控,並規劃好失敗時的補償路徑。 |
宏觀 (MAS) | Coordinator Agent 透過 DF 查詢能力並進行協商 | 協調者代理 向黃頁查詢:「誰能在 T1 時段封路且可撤回?」然後發送包含時窗和補償條款的協商指令。 |
微觀 GoF 結構圖:
註:此處 coordinate 負責接收意圖。補償邏輯則封裝在各單位執行的 Command 物件中 (參考 Day 17),由 Mediator 依事件結果決定是否派發補償命令。
中觀 EIP/EDA 資訊流:
Sasha|Event Ops 協調官,臨危受命。她深吸一口氣,拋棄了原本混亂的程式碼,採用了事件驅動、命令模式與資源租約的健壯架構。
from __future__ import annotations
from abc import ABC, abstractmethod
import collections
import time
import uuid
# --- Command Pattern (as per Day 17) ---
class Command(ABC):
@abstractmethod
def execute(self): pass
@abstractmethod
def undo(self): pass
# --- Interfaces and Base Classes ---
class Mediator(ABC):
@abstractmethod
def coordinate(self, sender: object, intent: str, payload: dict = None) -> None: pass
class Colleague:
def __init__(self, name: str) -> None:
self.name = name
self._mediator = None
def set_mediator(self, mediator: Mediator): self._mediator = mediator
def send_intent(self, intent: str, payload: dict = None):
if self._mediator: self._mediator.coordinate(self, intent, payload)
# --- Concrete Colleagues ---
class RoadCrew(Colleague):
def block_road(self, road_id: str, ctx: dict):
print(f"🚧 {self.name}: Executing command to block road {road_id}.")
self.send_intent("road_blocked", ctx)
def clear_road(self, road_id: str, ctx: dict):
print(f"✅ {self.name}: Executing UNDO to clear road {road_id}.")
self.send_intent("road_cleared", ctx)
class NotificationService(Colleague):
def send_alert(self, message: str, should_fail: bool, ctx: dict):
print(f"📢 {self.name}: Executing command to send alert -> '{message}'")
if should_fail:
print(f"🔥 {self.name}: API call failed!")
self.send_intent("notification_failed", ctx)
else: self.send_intent("notification_sent", ctx)
class MonitoringService(Colleague):
def start_monitoring(self, area: str, ctx: dict):
print(f"📡 {self.name}: Executing command to monitor area {area}.")
self.send_intent("monitoring_started", ctx)
# --- Concrete Commands ---
class BlockRoadCommand(Command):
def __init__(self, receiver: RoadCrew, road_id: str, ctx: dict):
self.receiver, self.road_id, self.ctx = receiver, road_id, ctx
def execute(self): self.receiver.block_road(self.road_id, self.ctx)
def undo(self): self.receiver.clear_road(self.road_id, self.ctx)
class SendAlertCommand(Command):
def __init__(self, receiver: NotificationService, message: str, fail: bool, ctx: dict):
self.receiver, self.message, self.fail, self.ctx = receiver, message, fail, ctx
def execute(self): self.receiver.send_alert(self.message, self.fail, self.ctx)
def undo(self): print(f"↩️ UNDO for SendAlert is a no-op or requires a retraction API call.")
# --- Supporting Services ---
class LeaseManager:
def __init__(self): self._leases = {} # resource -> (holder, expiry_time)
def acquire(self, resource: str, holder: str, ttl_seconds: int = 30) -> bool:
now = time.time()
if resource in self._leases and self._leases[resource][1] > now: return False
self._leases[resource] = (holder, now + ttl_seconds); return True
def release(self, resource: str, holder: str) -> bool:
if self._leases.get(resource, (None, 0))[0] == holder:
self._leases.pop(resource, None); return True
return False
def renew(self, resource: str, holder: str, ttl_seconds: int = 30) -> bool:
"""Extend a lease if the same holder still owns it."""
now = time.time()
if self._leases.get(resource, (None, 0))[0] != holder:
return False
self._leases[resource] = (holder, now + ttl_seconds)
return True
# --- Concrete Mediator (Production Grade) ---
class FestivalMediator(Mediator):
def __init__(self) -> None:
self._colleagues = {}
self._event_queue = collections.deque()
self._is_processing = False
self._seq_counter = 0
self._command_history = collections.defaultdict(list)
self._lease_manager = LeaseManager()
self._log = []
self._seen_keys = set()
def register(self, colleague: Colleague, *capabilities: str):
"""Register a colleague under one or more capabilities and wire mediator back-reference."""
colleague.set_mediator(self)
for cap in capabilities:
self._colleagues.setdefault(cap, []).append(colleague)
def pick(self, capability: str) -> Colleague: return self._colleagues[capability][0]
def coordinate(self, sender: object, intent: str, payload: dict = None) -> None:
self._seq_counter += 1
event = (self._seq_counter, sender, intent, payload or {})
print(f"📥 Event #{event[0]} received: '{intent}'. Queued.")
self._event_queue.append(event)
self._process_queue()
def _execute_command(self, command: Command, saga_id: str):
self._command_history[saga_id].append(command)
command.execute()
def _compensate(self, saga_id: str):
print(f"--- ⚠️ Initiating Compensation for Saga {saga_id} ---")
history = self._command_history.pop(saga_id, [])
for command in reversed(history): command.undo()
def _process_queue(self):
if self._is_processing: return
self._is_processing = True
while self._event_queue:
seq, sender, intent, payload = self._event_queue.popleft()
# 冪等性保護 & 決策日誌
idempotency_key = (intent, tuple(sorted(payload.items())))
if idempotency_key in self._seen_keys: continue
self._seen_keys.add(idempotency_key)
self._log.append({"seq": seq, "intent": intent, "payload": payload})
print(f"🧠 Processing event #{seq}: '{intent}'...")
# Accept both flat ctx ({"saga_id": ...}) and nested payload["ctx"]["saga_id"]
saga_id = payload.get("saga_id") or payload.get("ctx", {}).get("saga_id")
if intent == "initiate_setup":
ctx = {"saga_id": f"setup-{uuid.uuid4().hex[:6]}", "road_id": payload["road_id"]}
cmd = BlockRoadCommand(self.pick("road_blocker"), payload["road_id"], ctx)
self._execute_command(cmd, ctx["saga_id"])
elif intent == "road_blocked":
ctx = payload
msg = f"Road {ctx['road_id']} is closed."
fail = payload.get("simulate_failure", False)
cmd = SendAlertCommand(self.pick("alerter"), msg, fail, ctx)
self._execute_command(cmd, saga_id)
elif intent == "notification_sent":
if self._lease_manager.acquire("temp_power", saga_id):
print("💡 Lease acquired for temp_power.")
self.pick("monitor").start_monitoring("Stage A", payload)
else:
print("⏳ Power busy. Re-queuing monitoring task.")
# Add bounded retry counter to avoid idempotency de-dup
payload = dict(payload)
payload["attempt"] = payload.get("attempt", 0) + 1
if payload["attempt"] <= 3:
self.coordinate(self, "notification_sent", payload) # 重試 (最多 3 次)
else:
print("🛑 Monitoring skipped after max retries due to power contention.")
elif intent == "monitoring_started":
print("--- ✅ Festival Setup Sequence Complete ---")
# Release temp power lease on completion
if saga_id:
self._lease_manager.release("temp_power", saga_id)
self.coordinate(self, "setup_complete", payload)
elif intent == "notification_failed":
self._compensate(saga_id)
self._is_processing = False
# --- Client Code ---
if __name__ == "__main__":
mediator = FestivalMediator()
mediator.register(RoadCrew("RoadCrew-1"), "road_blocker")
mediator.register(NotificationService("Notifier-A"), "alerter")
mediator.register(MonitoringService("Monitor-Main"), "monitor")
print("\n--- SCENARIO 1: Successful Flow ---")
mediator.coordinate(None, "initiate_setup", {"road_id": "A"})
print("\n\n--- SCENARIO 2: Failure and Compensation Flow ---")
payload = {"road_id": "B", "simulate_failure": True}
mediator.coordinate(None, "initiate_setup", payload)
強化補償邏輯:目前的 SendAlertCommand
的 undo()
只是印出訊息。請實作一個更有意義的補償:發送一則「更正啟事」的通知。這需要 NotificationService
新增一個 send_retraction
方法。
租約續期 (Renew):在 LeaseManager
中加入 renew(resource, holder)
方法。修改 FestivalMediator
,讓它在 monitoring_started
事件後,如果流程尚未結束,能為 temp_power
的租約續期,避免因長時間監控導致租約過期。
🚩 協調中心變成「超級上帝類」:Mediator 應該只負責「協調」,而不是把所有業務邏輯、資料處理、執行細節全攬在身上。如果你的 Mediator 程式碼超過千行,那它可能已經變成萬能的上帝了。
🚩 同事偷跑,繞過協調中心:如果在程式碼中看到某個 Colleague 物件,竟然還保留著對另一個 Colleague 的直接引用,並且呼叫了它的方法,這就破壞了 Mediator 的初衷。
🚩 只有命令,沒有善後:一個好的協調中心不僅要會發號施令,更要處理失敗時的補償 (Compensation) 機制。只管殺,不管埋,是典型的不負責任 Mediator。
🚩 錯把長期流程當協調:Mediator 適合處理短期的、緊密的互動。對於跨越多天、多個 session 的長期業務流程,應該交給更專業的 Saga 模式或流程引擎 (Workflow Engine)。
突發狀況!開場前一小時,氣象局發布豪雨特報,同時,入口處人流瞬間暴增三倍。身為總指揮的你,會選擇:
A. 序列化處理:由 Mediator 立即下達一連串指令:「暫停入場 -> 推播延期通知 -> 開啟備用避雨通道 -> 重新計算人流模型」。所有動作嚴格按照順序執行。
B. 改為事件舞蹈:Mediator 發出一個「緊急狀況」事件,讓各子系統(入場、通知、通道、人流)各自監聽並獨立做出最快反應。
請留言選擇 A 或 B,並用一句話說明你的理由!
契約測試 (Contract Testing):編寫一個測試,確保任何 Colleague
的實例都沒有直接引用另一個 Colleague
的實例。可以使用 Mock 或測試替身來攔截非法呼叫,一旦發生直接呼叫就讓測試失敗。
補償與順序測試 (Compensation & Order Testing):設計一個情境測試,模擬「推播服務延遲後才成功」的狀況,驗證補償邏輯 不會 被錯誤觸發。再設計一個「重試三次後才成功」的情境,驗證最終流程的正確性。
資源租約測試 (Lease Testing):針對 LeaseManager
編寫單元測試,驗證租約在 TTL 後確實會過期,以及當一個持有者釋放資源後,排隊等候者能成功獲取資源。
EIP/EDA (中觀):今天的 FestivalMediator
就是一個 Orchestrator
。在更大型的系統中,Orchestrator
只負責定義流程順序與仲裁邏輯,實際的命令會透過一個共享的 Command Bus 來派發。而流程的執行狀態,則可以透過 Iterator 模式來做巡檢與回放,這與 Day 18 的精神一致。
MAS (宏觀):我們可以把「封路」、「推播」、「監控」這些行為,抽象化為「能力 (Capability)」。協調者代理 (Coordinator Agent) 會先向黃頁 (DF) 查詢目前有哪些代理具備這些能力,然後才根據任務需求,動態地與它們進行協商。決策邏輯可以更複雜,例如基於拓撲排序 (Topological Sort)、優先級和時間窗口,並允許不互相競爭資源的步驟並行執行。
多方別硬聊,交給協調;序與補償,一口出清。
今天的海港音樂祭總算在 Sasha 的指揮下步上軌道。但新的問題來了,臨時交管的狀態有「準備中」、「已生效」、「已解除」、「緊急暫停」等多種模式,這些狀態之間的轉換邏輯如果都寫在 if-else
裡,恐怕又是一場災難...
明日預告:Day 20|State(狀態機):是時候把臨時管制的「紅黃綠」燈號誌化,讓狀態轉移的邏輯不再四處散落!
為了確保在不支援 Mermaid 渲染的環境中也能正常閱讀,以下提供文中圖表的 ASCII 替代版本:
┌─────────────────┐
│ <<interface>> │
│ Mediator │
│─────────────────│
│ +coordinate() │
└─────────┬───────┘
│
▲ implements
│
┌─────────┴───────┐
│ FestivalMediator │
│─────────────────│
│ -colleagues: map │
│ +register() │
│ +coordinate() │
└─────────┬───────┘
│
│ manages
▼
┌──────────────────────────────────────────────────────────┐
│ │
▼ ▼ ▼ │
┌─────────┐ ┌─────────────┐ ┌─────────────┐
│RoadCrew │ │Notification │ │ Monitoring │
│ │ │ Service │ │ Service │
│─────────│ │─────────────│ │─────────────│
│+execute()│ │ +execute() │ │ +execute() │
└─────────┘ └─────────────┘ └─────────────┘
│ │ │
└─────────────────────────┼─────────────────────────┘
▲
inherits from Colleague
│
┌─────────────────┐
│ <<abstract>> │
│ Colleague │
│─────────────────│
│ -mediator │
│ +send_intent() │
└─────────────────┘
Sasha FestivalMediator RoadCrew NotificationService MonitoringService
│ │ │ │ │
│ initiate_ │ │ │ │
│ setup │ │ │ │
│──────────────>│ │ │ │
│ │ execute( │ │ │
│ │ BlockRoad │ │ │
│ │ Command) │ │ │
│ │────────────────>│ │ │
│ │ │ road_blocked │ │
│ │<────────────────│ │ │
│ │ execute( │ │ │
│ │ SendAlert │ │ │
│ │ Command) │ │ │
│ │─────────────────────────────────>│ │
│ │ │ notification_ │ │
│ │ │ sent │ │
│ │<─────────────────────────────────│ │
│ │ execute( │ │ │
│ │ StartMonitor │ │ │
│ │ Command) │ │ │
│ │───────────────────────────────────────────────────────>│
│ │ │ │ monitoring_ │
│ │ │ │ started │
│ │<───────────────────────────────────────────────────────│
│ setup_ │ │ │ │
│ complete │ │ │ │
│<──────────────│ │ │ │
【問題】網狀直接耦合: 【解法】星狀協調中心:
RoadCrew ←──────→ NotificationService RoadCrew
│ ╲ ╱ │ │
│ ╲ ╱ │ │
│ ╲ ╱ │ ▼
│ ╱╲ │ ┌─────────────────┐
│ ╱ ╲ │ │ FestivalMediator │
│ ╱ ╲ │ │ 協調中心 │
▼ ╱ ╲ ▼ └─────────────────┘
MonitoringService ▲
│
複雜度: O(n²) │
職責混亂、難以維護 NotificationService
│
▼
MonitoringService
複雜度: O(n)
職責清晰、易於擴展
┌─────────────┐ ┌─────────────────┐ ┌──────────────────┐
│ Event Queue │────>│ Event Processor │────>│ Command History │
│ │ │ │ │ & Saga Log │
│ ┌─────────┐ │ │ ┌─────────────┐ │ │ │
│ │Event #1 │ │ │ │ Idempotency │ │ │ ┌──────────────┐ │
│ │Event #2 │ │ │ │ Check │ │ │ │ Saga: setup- │ │
│ │Event #3 │ │ │ │ │ │ │ │ abc123 │ │
│ └─────────┘ │ │ └─────────────┘ │ │ │ - BlockRoad │ │
│ │ │ │ │ │ │ - SendAlert │ │
└─────────────┘ │ ▼ │ │ │ - Monitor │ │
│ ┌─────────────┐ │ │ └──────────────┘ │
失敗時觸發 │ │ Compensation│ │ │ │
┌───────────────┤ │ Logic │ │ └──────────────────┘
│ │ └─────────────┘ │ ▲
▼ └─────────────────┘ │
┌─────────────┐ │
│ Undo Stack │ Command History
│ │ Referenced
│ Clear Road │<────────────────────────────────────┘
│ (Rollback) │
└─────────────┘